-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig #16458
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for this patch
config.consumerGroupMigrationPolicy, | ||
config.offsetsTopicCompressionType | ||
) | ||
val groupCoordinatorConfig = new GroupCoordinatorConfig(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a method to KafkaConfig
to return GroupCoordinatorConfig
? With that change, we can remove all GroupCoordinatorConfig-related getters from KafkaConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion! I have added the method to KafkaConfig to return GroupCoordinatorConfig and removed all related getters. This change is included in the latest commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat thanks for this patch
@@ -2576,15 +2577,15 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() { | |||
|
|||
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() | |||
.withGroupMetadataManager(groupMetadataManager) | |||
.withOffsetsRetentionMs(1000) | |||
.withOffsetsRetentionMinutes(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why we need this change. Also, the value is changed from 1 second to 1 minute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the original time unit of config offsets.retention.minutes
is in minutes, and before this refactor we pass offsets.retention
in millis to GroupCoordinatorConfig constructor and use it in OffsetMetadataManagerTest, now we pass AbstractConfig (i.e. KafkaConfig) to GroupCoordinatorConfig, which means we should pass offsets.retention in minutes instead of ms
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); | ||
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); | ||
|
||
return new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return new GroupCoordinatorConfig(new AbstractConfig(Utils.mergeConfigs(Arrays.asList(
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF)),
configs, false));
WDYT? we don't need the temporary class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, thanks for the suggestion.
val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG) | ||
|
||
/** Consumer group configs */ | ||
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM Could you please take a look? Does it follow your idea that we should move getters out of KafkaConfig
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late response here, this looks great! I think the validators could also be moved out as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we will file a MINOR for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandboat @chia7712 Thanks for the PR. Overall, I agree with the change. However, I have left a few suggestions for consideration. Please let me know what you think.
public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() { | ||
return ConsumerGroupMigrationPolicy.parse( | ||
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to point out that this change is a bit risky in my opinion because it does not ensure during the startup that the migration policy is really correct. If it fails somehow, it will fail later on when consumerGroupMigrationPolicy
is accessed for the first time. I wonder if we should keep local attributes and initialize them in the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation is addressed by the GroupCoordinatorConfig
's config definition.
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L182
Hence, the string value is valid in constructing GroupCoordinatorConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I was thinking about the case where the validation has a bug or is not good enough.
public int offsetCommitTimeoutMs() { | ||
return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example of the config which is accessed extremely frequently (there are other like this too). I think that having attributes would be better as it avoid having to look it up in the config every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good question!
The approach of this PR is to make sure GroupCoordinatorConfig
can see the latest configs to avoid potential bugs (#16394). However, it has side effect which brings extra cost: "volatile" and "lookup/parse".
- The cost of "volatile" (or other similar sync trick) is required if we make
GroupCoordinatorConfig
see latest configs. - The cost of "lookup/parse" could be eliminated if we do a bit refactor for it. For example, we pass
Supplier<GroupCoordinatorConfig>
instead ofGroupCoordinatorConfig
to the callers. By that changes, we can makeGroupCoordinatorConfig
have all immutable pre-created local attributes. The impl ofSupplier<GroupCoordinatorConfig>
will be generated byKafkaConfig
and it looks likeAtomicReference::get
. However, the side effect is that the usage will get a little ugly: "config.numThreads()" -> "config.get().numThreads()"
WDYT? BTW, we had a related discussion in https://issues.apache.org/jira/browse/KAFKA-17001
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of the configs in this class are dynamic so I am not sure that it is worth it. We could perhaps mention it in the javadoc of the class. If we introduce a dynamic config, we should indeed not use an attribute for it.
For the context, in KafkaConfig, we always had the distinction between val (static values) and def (dynamic ones). We could do the same here, I suppose.
I double-check all configs and yes all of them are not dynamic. Maybe we don't need to be over-engineering for now. Hence, we can have a PR for following changes.
@brandboat @dajac @OmniaGM PTAL, I hope this can be a guideline for all similar config class |
@brandboat I open https://issues.apache.org/jira/browse/KAFKA-17081 as follow-up. And it is assigned to you. PLEASE feel free to assign it back to me if you have no bandwidth |
@chia7712 Thanks. Sounds good to me! |
related to https://issues.apache.org/jira/browse/KAFKA-16909
as title, this pr follows RemoteLogManagerConfig.java, pass AbstractConfig to constructor.
Committer Checklist (excluded from commit message)